import time
import sys
from threading import Thread, Lock
import Queue
import pika
import copy
from pika.exceptions import ConnectionClosed
import traceback
from common.log import info, warn, debug, error
from common.utils import Singleton

class RabbitmqService(Singleton):

    _connection = None
    _channel = None
    _closing = False
    _inited = False
    _consumer_tag_list = []
    publish_thread = None
    basic_qos_kwargs = None
    basic_consume_kwargs_list = []
    msgqueue = Queue.Queue()
    publishqueue = Queue.Queue()
    qname_callback_map = {}
    rk_qname_bind_map = {}
    publish_exchange_kwargs = None
    exchange_kwargs_list = []
    queue_kwargs_list = []
    queue_bind_kwargs_list = []
    cancel_added = False

    def __init__(self, host="localhost", port=5672, \
                username='guest', password='guest',
                num_worker_threads=10):
        """
        Init the rabbitmq service _connection.
        :param host: rabbitmq host
        :param port: rabbitmq port
        """
        if not self._inited:
            self.num_worker_threads = num_worker_threads
            self._host = host
            self._port = port
            self._username = username
            self._password = password
            self._url = "amqp://%s:%s@%s:%d/" % (username, password, host, port) + "%2F"
            self._inited = True

    def __del__(self):
        """
        Close the rabbitmq service _connection.
        :return:
        """
        self.stop()

    def connect(self):
        return pika.SelectConnection(pika.URLParameters(self._url),
                                     self.on_connection_open,
                                     stop_ioloop_on_close=False)

    def on_connection_open(self, unused_connection):
        debug("Connection openning..")
        self.add_on_connection_close_callback()
        self.open_channel()

    def add_on_connection_close_callback(self):
        self._connection.add_on_close_callback(self.on_connection_closed)

    def on_connection_closed(self, _connection, reply_code, reply_text):
        self._channel = None
        if self._closing:
            self._connection.ioloop.stop()
        else:
            warn('Connection closed, reopening in 5 seconds: (%s) %s',
                           reply_code, reply_text)
            self._connection.add_timeout(5, self.reconnect)

    def reconnect(self):
        self._connection.ioloop.stop()

        if not self._closing:

            # Create a new _connection
            self._connection = self.connect()

            # There is now a new _connection, needs a new ioloop to run
            self._connection.ioloop.start()

    def open_channel(self):
        debug("Channel openning...")
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_channel_open(self, channel):
        self._channel = channel
        if self.basic_qos_kwargs:
            self._channel.basic_qos(**self.basic_qos_kwargs)
        self.add_on_channel_close_callback()
        self.setup_exchange()

    def add_on_channel_close_callback(self):
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reply_code, reply_text):
        self._connection.close()

    def setup_exchange(self):
        debug("Declare exchage...")
        for exchange_kwargs in self.exchange_kwargs_list:
            if "callback" not in exchange_kwargs:
                exchange_kwargs["callback"] = self.on_exchange_declareok

            self._channel.exchange_declare(**exchange_kwargs)

    def on_exchange_declareok(self, unused_frame):
        self.setup_queue()

    def setup_queue(self):
        debug("Declare queue...")
        for queue_kwargs in self.queue_kwargs_list:
            if "declared" not in queue_kwargs or not queue_kwargs["declared"]:
                if "callback" not in queue_kwargs:
                    queue_kwargs["callback"] = self.on_queue_declareok

                self._channel.queue_declare(**queue_kwargs)
                queue_kwargs["declared"] = True
            else:
                debug("Queue %s has declared." % (queue_kwargs["queue"]))

    def on_queue_declareok(self, method_frame):
        for queue_bind_kwargs in self.queue_bind_kwargs_list:
            if "route_keys" in queue_bind_kwargs:
                route_keys = queue_bind_kwargs["route_keys"]
                del queue_bind_kwargs["route_keys"]
                for route_key in route_keys:
                    exchange = queue_bind_kwargs["exchange"]
                    queue = queue_bind_kwargs["queue"]
                    info('Binding %s to %s with %s'%
                        (exchange, queue, route_key))
                    self._channel.queue_bind(callback=self.on_bindok,
                                                exchange=exchange, \
                                                queue=queue, nowait=False, \
                                                routing_key=route_key)

    def on_bindok(self, unused_frame):
        self.start_consuming()

    def start_consuming(self):
        if not self.cancel_added:
            info('Issuing consumer related commands')
            self.add_on_cancel_callback()
            self.cancel_added = True
        for basic_consume_kwargs in self.basic_consume_kwargs_list:
            if "consumed" not in basic_consume_kwargs or not basic_consume_kwargs["consumed"]:
                if "consumer_callback" not in basic_consume_kwargs:
                    basic_consume_kwargs["consumer_callback"] = self.consumer_callback
                    _consumer_tag = self._channel.basic_consume(**basic_consume_kwargs)
                    self._consumer_tag_list.append(_consumer_tag)
                    basic_consume_kwargs["consumed"] = True

    def stop_consuming(self):
        if self.publish_thread:
            self.publish_thread._Thread__stop()
            self.publish_thread.join()

        for t in self.multiple_pool:
            t._Thread__stop()
            t.join()
        self.msgqueue.join()

        if self._channel:
            info('Sending a Basic.Cancel command to RabbitMQ')
            for _consumer_tag in self._consumer_tag_list:
                self._channel.basic_cancel(self.on_cancelok, _consumer_tag)

    def on_cancelok(self, unused_frame):
        info('RabbitMQ acknowledged the cancellation of the consumer')
        self.close_channel()

    def close_channel(self):
        info('Closing the channel')
        self._channel.close()

    def add_on_cancel_callback(self):
        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)

    def on_consumer_cancelled(self, method_frame):
        info('Consumer was cancelled remotely, shutting down: %r', method_frame)
        if self._channel:
            self._channel.close()

    def channel_create(self, **kwargs):
        # prefetch_count
        if "prefetch_count" in kwargs:
            self.basic_qos_kwargs = {"prefetch_count":kwargs["prefetch_count"]}

    def exchange_declare(self, **kwargs):
        if "exchange" not in kwargs:
            error("Failed to get exchange name.")
            return

        if "type" not in kwargs:
            kwargs["exchange_type"] = "topic"
        else:
            kwargs["exchange_type"] = kwargs["type"]
            del kwargs["type"]
        if "is_publish_exchange" in kwargs and kwargs["is_publish_exchange"]:
            del kwargs["is_publish_exchange"]
            self.publish_exchange_kwargs = copy.deepcopy(kwargs)
        self.exchange_kwargs_list.append(kwargs)

    def queue_bind(self, exchange=None, route_keys=[], queue_name=None, **kwargs):
        """
        Bind a queue on an exchange.
        :param exchange:
        :param route_keys:
        :param queue_name:
        :param kwargs: extend arguments for the queue bind settings.
        :return: queue name
        """
        queue_kwargs = {}

        if queue_name:
            queue_kwargs['queue'] = queue_name

        if "exclusive" in kwargs:
            queue_kwargs['exclusive'] = kwargs['exclusive']

        if "durable" in kwargs:
            queue_kwargs['durable'] = kwargs['durable']

        if "arguments" in kwargs:
            queue_kwargs['arguments'] = kwargs['arguments']

        self.queue_kwargs_list.append(queue_kwargs)

        if exchange:
            queue_bind_kwargs = {}
            queue_bind_kwargs["exchange"] = exchange
            queue_bind_kwargs["queue"] = queue_name
            queue_bind_kwargs['route_keys'] = route_keys
            self.queue_bind_kwargs_list.append(queue_bind_kwargs)
            for route_key in route_keys:
                self.rk_qname_bind_map[route_key] = queue_name

        return queue_name

    def consumer_callback(self, ch, method, properties, body):
        try:
            self.msgqueue.put((ch, method, properties, body))
        except Exception as ex:
            # if some exception happened we record the exception.
            error(traceback.format_exc())
            error("Some exceptions happened due to %s. The message is %s" % \
                    (ex, body))
        finally:
            # no matter what happend we mark the message we
            # received was consumered
            self.acknowledge_message(method.delivery_tag)
            # debug('Acknowledge message of %s with delivery tag %d' % \
            #         (method.routing_key, method.delivery_tag))

    def acknowledge_message(self, delivery_tag):
        try:
            self._channel.basic_ack(delivery_tag=delivery_tag)
        except Exception as ex:
            error(traceback.format_exc())
            error("Ack error due to %s." % (ex))
            self._channel.basic_nack(delivery_tag=delivery_tag)

    def multiple_thread_worker(self):
        while True:
            try:
                elem = self.msgqueue.get()
                ch, method, properties, body = elem
                qname = self.rk_qname_bind_map[method.routing_key]
                callback = self.qname_callback_map[qname]
                callback(ch, method, properties, body)
            except Exception as ex:
                # if some exception happened we nack the message
                error(traceback.format_exc())
                error("Some exceptions happened due to %s. The message is %s" % \
                    (ex, body))
            finally:
                # to avoid queue blocked
                self.msgqueue.task_done()

    def regist_consume(self, queue_name, callback):
        if callback is not None:
            self.qname_callback_map[queue_name] = callback
            basic_consume_kwargs = {}
            basic_consume_kwargs["queue"] = queue_name
            self.basic_consume_kwargs_list.append(basic_consume_kwargs)
        else:
            error("Consumer callback function should not be None.")

    def start_worker_thread(self):
        self.publish_thread = Thread(target=self.publish_worker)
        self.publish_thread.setDaemon(True)
        self.publish_thread.start()
        self.multiple_pool = []
        for i in range(self.num_worker_threads):
            t = Thread(target=self.multiple_thread_worker)
            t.setDaemon(True)
            t.start()
            # try some counts to wait the thread alive
            for i in range(3):
                if t.isAlive():
                    break
                else:
                    warn("event worker thread is not ready, waitting..")
                    time.sleep(2)
            else:
                warn("event worker can not be ready.")
            self.multiple_pool.append(t)
        else:
            info("Multiple thread consumer worker started...")

    def consuming_loop(self):
        self._connection = self.connect()
        self.start_worker_thread()
        self._connection.ioloop.start()

    def stop(self):
        info('Stopping')
        self._closing = True
        self.stop_consuming()
        self._connection.ioloop.start()
        info('Stopped')

    def publish_worker(self):
        info("[x] publish worker start...")
        crash_try_count = 0
        elem = None
        __credentialis = pika.PlainCredentials(self._username, \
                                                self._password)
        if not self.publish_exchange_kwargs:
            err_msg = "Please declare a exchange for publish, use is_publish_exchange to specify the exchange with exchange_declare."
            error(err_msg)
            raise Exception(err_msg)

        if "callback" in self.publish_exchange_kwargs:
            del self.publish_exchange_kwargs["callback"]
        while True:
            try:
                elem = None
                while self.publishqueue.empty():
                    crash_try_count = 0
                    time.sleep(3)

                _connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host, port=self._port,
                                                                        heartbeat=0,
                                                                        credentials=__credentialis,
                                                                        blocked_connection_timeout=3000))
                channel = _connection.channel()
                channel.exchange_declare(**self.publish_exchange_kwargs)
                while not self.publishqueue.empty():
                    elem = self.publishqueue.get(block=False)
                    exchange, routing_key, message, kwargs = elem
                    channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message, **kwargs)
                    info(" [x] Sent %r" % (message))
                    self.publishqueue.task_done()

                channel.close()
                _connection.close()

            except ConnectionClosed as ex:
                error(traceback.format_exc())
                info("Comsumer _connection closed due to %s, try again ..." % (ex))
                # try to publish it again
                if elem:
                    self.publishqueue.put(elem)
                time.sleep(1)
            except Exception as ex:
                error(traceback.format_exc())
                info("Some other exception happened, try count %d." % (crash_try_count))
                # try to publish it again
                if elem:
                    self.publishqueue.put(elem)
                time.sleep(1 + crash_try_count)
                crash_try_count += 1
                if crash_try_count > 10:
                    error("Failed to try, the exception can not avoid, exit with failure.")
                    sys.exit(1)

    def publish(self, exchange, routing_key, message, **kwargs):
        self.publishqueue.put((exchange, routing_key, message, kwargs))

    def sleep(self, duration):
        time.sleep(duration)
